{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-steps.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Showcasing DataPath and PipelineParameter\n",
        "\n",
        "This notebook demonstrateas the use of [**DataPath**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.datapath.datapath?view=azure-ml-py) and [**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) in AML Pipeline. You will learn how strings and [**DataPath**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.datapath.datapath?view=azure-ml-py) can be parameterized and submitted to AML Pipelines via [**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py).\n",
        "To see more about how parameters work between steps, please refer [aml-pipelines-with-data-dependency-steps](https://aka.ms/pl-data-dep).\n",
        "\n",
        "* [How to create a Pipeline with a DataPath PipelineParameter](#index1)\n",
        "* [How to submit a Pipeline with a DataPath PipelineParameter](#index2)\n",
        "* [How to submit a Pipeline and change the DataPath PipelineParameter value from the sdk](#index3)\n",
        "* [How to submit a Pipeline and change the DataPath PipelineParameter value using a REST call](#index4)\n",
        "* [How to create a datastore trigger schedule and use the data_path_parameter_name to get the path of the changed blob in the Pipeline](#index5)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Azure Machine Learning and Pipeline SDK-specific imports"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import azureml.core\n",
        "from azureml.core import Workspace, Experiment\n",
        "from azureml.core.compute import ComputeTarget, AmlCompute\n",
        "from azureml.data.datapath import DataPath, DataPathComputeBinding\n",
        "from azureml.widgets import RunDetails\n",
        "\n",
        "from azureml.pipeline.core import PipelineParameter\n",
        "from azureml.pipeline.core import Pipeline, PipelineRun\n",
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Initialize Workspace\n",
        "\n",
        "Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure the config file is present at .\\config.json\n",
        "\n",
        "If you don't have a config.json file, go through the [configuration Notebook](https://aka.ms/pl-config) first.\n",
        "\n",
        "This sets you up with a working config file that has information on your workspace, subscription id, etc."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create an Azure ML experiment\n",
        "\n",
        "Let's create an experiment named \"showcasing-datapath\" and a folder to hold the training scripts. The script runs will be recorded under the experiment in Azure."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Choose a name for the run history container in the workspace.\n",
        "experiment_name = 'showcasing-datapath'\n",
        "source_directory  = '.'\n",
        "\n",
        "experiment = Experiment(ws, experiment_name)\n",
        "experiment"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create or Attach an AmlCompute cluster\n",
        "You will need to create a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) for your AutoML run. In this tutorial, you get the default `AmlCompute` as your training compute resource.\n",
        "\n",
        "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Choose a name for your cluster.\n",
        "amlcompute_cluster_name = \"cpu-cluster\"\n",
        "\n",
        "found = False\n",
        "# Check if this compute target already exists in the workspace.\n",
        "cts = ws.compute_targets\n",
        "if amlcompute_cluster_name in cts and cts[amlcompute_cluster_name].type == 'AmlCompute':\n",
        "    found = True\n",
        "    print('Found existing compute target.')\n",
        "    compute_target = cts[amlcompute_cluster_name]\n",
        "    \n",
        "if not found:\n",
        "    print('Creating a new compute target...')\n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\", # for GPU, use \"Standard_NC6s_v3\"\n",
        "                                                                #vm_priority = 'lowpriority', # optional\n",
        "                                                                max_nodes = 4)\n",
        "\n",
        "    # Create the cluster.\n",
        "    compute_target = ComputeTarget.create(ws, amlcompute_cluster_name, provisioning_config)\n",
        "    \n",
        "    # Can poll for a minimum number of nodes and for a specific timeout.\n",
        "    # If no min_node_count is provided, it will use the scale settings for the cluster.\n",
        "    compute_target.wait_for_completion(show_output = True, timeout_in_minutes = 10)\n",
        "    \n",
        "     # For a more detailed view of current AmlCompute status, use get_status()."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Data and arguments setup \n",
        "\n",
        "We will setup a trining script to run and its arguments to be used. The sample training script below will print the two arguments to show what has been passed to pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "%%writefile train_with_datapath.py\n",
        "import argparse\n",
        "import os\n",
        "\n",
        "parser = argparse.ArgumentParser(\"train\")\n",
        "parser.add_argument(\"--arg1\", type=str, help=\"sample string argument\")\n",
        "parser.add_argument(\"--arg2\", type=str, help=\"sample datapath argument\")\n",
        "args = parser.parse_args()\n",
        "\n",
        "print(\"Sample string argument  : %s\" % args.arg1)\n",
        "print(\"Sample datapath argument: %s\" % args.arg2)\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Let's setup string and DataPath arguments using PipelineParameter. \n",
        "\n",
        "Note that Pipeline accepts a tuple of the form ([**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) , [**DataPathComputeBinding**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.datapath.datapathcomputebinding?view=azure-ml-py)) as an input. DataPath defines the location of input data. DataPathComputeBinding defines how the data is consumed during step execution. The DataPath can be modified at pipeline submission time with a DataPath parameter, while the compute binding does not change. For static data inputs, we use [**DataReference**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.data_reference.datareference?view=azure-ml-py) which defines both the data location and compute binding."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "datapath-remarks-sample"
        ]
      },
      "outputs": [],
      "source": [
        "def_blob_store = ws.get_default_datastore()\n",
        "print(\"Default datastore's name: {}\".format(def_blob_store.name))\n",
        "\n",
        "data_path = DataPath(datastore=def_blob_store, path_on_datastore='sample_datapath1')\n",
        "datapath1_pipeline_param = PipelineParameter(name=\"input_datapath\", default_value=data_path)\n",
        "datapath_input = (datapath1_pipeline_param, DataPathComputeBinding(mode='mount'))\n",
        "\n",
        "string_pipeline_param = PipelineParameter(name=\"input_string\", default_value='sample_string1')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<a id='index1'></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create a Pipeline with a DataPath PipelineParameter\n",
        "\n",
        "Note that the ```datapath_input``` is specified on both arguments and inputs to create a step."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "train_step = PythonScriptStep(\n",
        "    name='train_step',\n",
        "    script_name=\"train_with_datapath.py\",\n",
        "    arguments=[\"--arg1\", string_pipeline_param, \"--arg2\", datapath_input],\n",
        "    inputs=[datapath_input],\n",
        "    compute_target=compute_target, \n",
        "    source_directory=source_directory)\n",
        "print(\"train_step created\")\n",
        "\n",
        "pipeline = Pipeline(workspace=ws, steps=[train_step])\n",
        "print(\"pipeline with the train_step created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<a id='index2'></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Submit a Pipeline with a DataPath PipelineParameter\n",
        "\n",
        "Pipelines can be submitted with default values of PipelineParameters by not specifying any parameters."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run = experiment.submit(pipeline)\n",
        "print(\"Pipeline is submitted for execution\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "RunDetails(pipeline_run).show()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<a id='index3'></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Submit a Pipeline and change the DataPath PipelineParameter value from the sdk\n",
        "\n",
        "Or Pipelines can be submitted with values other than default ones by using pipeline_parameters. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run_with_params = experiment.submit(pipeline, \\\n",
        "        pipeline_parameters={'input_datapath': DataPath(datastore=def_blob_store, path_on_datastore='sample_datapath2'),\n",
        "                         'input_string': 'sample_string2'}) "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "RunDetails(pipeline_run_with_params).show()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run_with_params.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<a id='index4'></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Submit a Pipeline and change the DataPath PipelineParameter value using a REST call\n",
        "\n",
        "Let's published the pipeline to use the rest endpoint of the published pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "published_pipeline = pipeline.publish(name=\"DataPath_Pipeline\", description=\"Pipeline to test Datapath\", continue_on_step_failure=True)\n",
        "published_pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.authentication import InteractiveLoginAuthentication\n",
        "import requests\n",
        "\n",
        "auth = InteractiveLoginAuthentication()\n",
        "aad_token = auth.get_authentication_header()\n",
        "\n",
        "rest_endpoint = published_pipeline.endpoint\n",
        "\n",
        "print(\"You can perform HTTP POST on URL {} to trigger this pipeline\".format(rest_endpoint))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# specify the param when running the pipeline\n",
        "response = requests.post(rest_endpoint, \n",
        "                         headers=aad_token, \n",
        "                         json={\"ExperimentName\": \"MyRestPipeline\",\n",
        "                               \"RunSource\": \"SDK\",\n",
        "                               \"DataPathAssignments\": {\n",
        "                                   \"input_datapath\": { \n",
        "                                       \"DataStoreName\": def_blob_store.name,\n",
        "                                       \"RelativePath\": 'sample_datapath3'\n",
        "                                   }\n",
        "                               },\n",
        "                               \"ParameterAssignments\": {\"input_string\": \"sample_string3\"}\n",
        "                              }\n",
        "                        )"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "try:\n",
        "    response.raise_for_status()\n",
        "except Exception:    \n",
        "    raise Exception('Received bad response from the endpoint: {}\\n'\n",
        "                    'Response Code: {}\\n'\n",
        "                    'Headers: {}\\n'\n",
        "                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))\n",
        "\n",
        "run_id = response.json().get('Id')\n",
        "print('Submitted pipeline run: ', run_id)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "published_pipeline_run_via_rest = PipelineRun(ws.experiments[\"MyRestPipeline\"], run_id)\n",
        "RunDetails(published_pipeline_run_via_rest).show()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "published_pipeline_run_via_rest.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<a id='index5'></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create a Datastore trigger schedule and use data path parameter\n",
        "\n",
        "When the Pipeline is scheduled with DataPath parameter, it will be triggered by the modified or added data in the DataPath. ```path_on_datastore``` should be a folder and the value of the DataPath will be replaced by the path of the modified data."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import Schedule\n",
        "\n",
        "schedule = Schedule.create(workspace=ws, \n",
        "                           name=\"Datastore_trigger_schedule\",\n",
        "                           pipeline_id=published_pipeline.id, \n",
        "                           experiment_name='Scheduled_Pipeline',\n",
        "                           datastore=def_blob_store,\n",
        "                           wait_for_provisioning=True,\n",
        "                           description=\"Datastore trigger schedule demo\",\n",
        "                           path_on_datastore=\"sample_datapath_for_folder\",\n",
        "                           data_path_parameter_name=\"input_datapath\") #Same name as used above to create PipelineParameter\n",
        "\n",
        "print(\"Created schedule with id: {}\".format(schedule.id))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "schedule.disable()\n",
        "schedule"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "shbijlan"
      }
    ],
    "category": "tutorial",
    "compute": [
      "AML Compute"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML"
    ],
    "friendly_name": "How to use DataPath as a PipelineParameter",
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.7"
    },
    "order_index": 13,
    "star_tag": [
      "featured"
    ],
    "tags": [
      "None"
    ],
    "task": "Demonstrates the use of DataPath as a PipelineParameter"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}